import os
import shutil
import zipfile
import urllib.request
REPO_ZIP_FILE = 'LinearizedNNs-master.zip'
urllib.request.urlretrieve('https://github.com/maxkvant/LinearizedNNs/archive/master.zip', REPO_ZIP_FILE)
REPO_PATH = "LinearizedNNs-master"
if os.path.exists(REPO_PATH):
shutil.rmtree(REPO_PATH)
with zipfile.ZipFile(REPO_ZIP_FILE, 'r') as zip_ref:
zip_ref.extractall('.')
assert os.path.exists(REPO_PATH)
import sys
sys.path.append(f"{REPO_PATH}/src")
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torchvision import transforms, datasets
from torchvision.datasets import FashionMNIST
from pytorch_impl.nns import ResNet, FCN, CNN
from pytorch_impl.nns import warm_up_batch_norm
from pytorch_impl.estimators import LinearizedSgdEstimator, SgdEstimator, MatrixExpEstimator
from pytorch_impl import ClassifierTraining
from pytorch_impl.matrix_exp import matrix_exp, compute_exp_term
from pytorch_impl.nns.utils import to_one_hot
torch.manual_seed(0)
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
print('Torch version: {}'.format(torch.__version__))
print('Device: {}'.format(device))
D = 28
num_classes = 10
train_loader = torch.utils.data.DataLoader(
FashionMNIST(root='.', train=True, download=True,
transform=transforms.ToTensor()),
batch_size=4096, shuffle=True, pin_memory=True)
test_loader = torch.utils.data.DataLoader(
FashionMNIST(root='.', train=False, transform=transforms.ToTensor()),
batch_size=4096, shuffle=True, pin_memory=True)
a = 30 / 180 * np.pi
M = torch.tensor([[0, -1], [1, 0]]) * a
matrix_exp(M, device)
a = 30 / 180 * np.pi
M = torch.tensor([[0, -1], [1, 0]]) * a
M_clone = M.clone().to(device)
torch.matmul(M_clone, compute_exp_term(M, device)) + torch.eye(2).to(device)
CNN(1, input_channels=1)
_, (X, y) = next(enumerate(train_loader))
X.size()
model = CNN(1, input_channels=1, num_channels=256).to(device)
warm_up_batch_norm(model, train_loader, device)
estimator = MatrixExpEstimator(model, num_classes, device, learning_rate=1e1, momentum=0.)
_, (X, y) = next(enumerate(train_loader))
X, y = X.to(device), y.to(device)
estimator.fit(X, y)
ClassifierTraining(estimator, device).get_accuracy(test_loader)
def get_estimator(model, num_classes):
return MatrixExpEstimator(model, num_classes, device, learning_rate=10.)
ws_sum = get_estimator(model, num_classes).ws.detach()
prev_size = None
batches = 0
for batch_id, (X, y) in enumerate(train_loader):
if (prev_size is not None) and prev_size != len(X):
break
prev_size = len(X)
X, y = X.to(device), y.to(device)
estimator = get_estimator(model, num_classes)
estimator.fit(X, y)
ws_sum += estimator.ws.detach()
batches += 1
estimator = get_estimator(model, num_classes)
estimator.ws = ws_sum / batches
ClassifierTraining(estimator, device).get_accuracy(test_loader)
model = CNN(10, input_channels=1).to(device)
warm_up_batch_norm(model, train_loader, device)
learning_rate = .005
estimator = SgdEstimator(model, nn.CrossEntropyLoss(), learning_rate)
training = ClassifierTraining(estimator, device)
training.train(train_loader, test_loader, num_epochs=200, learning_rate=learning_rate)
learning_rate = .02
linearized_estimator = LinearizedSgdEstimator(FCN(1, D * D).to(device), num_classes, nn.CrossEntropyLoss(), learning_rate)
linearized_training = ClassifierTraining(linearized_estimator, device)
linearized_training.train(train_loader, test_loader, num_epochs=10, learning_rate=learning_rate)
estimator = SgdEstimator(FCN(10, D * D).to(device), nn.CrossEntropyLoss(), learning_rate)
training = ClassifierTraining(estimator, device)
training.train(train_loader, test_loader, num_epochs=10, learning_rate=learning_rate)
_, (X, y) = next(enumerate(train_loader))
X, y = X.to(device), y.to(device)
estimator.fit(X, y)
print(estimator.predict(X).size())
((estimator.predict(X) - to_one_hot(y, num_classes).to(device)) ** 2).mean()
estimator
_, (X, y) = next(enumerate(test_loader))
X, y = X.to(device), y.to(device)
(torch.argmax(estimator.predict(X), dim=1) == y).double().mean()
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
cifar10_stats = {
"mean" : (0.4914, 0.4822, 0.4465),
"std" : (0.24705882352941178, 0.24352941176470588, 0.2615686274509804),
}
transform_train = transforms.Compose([
transforms.Lambda(lambda x: np.asarray(x)),
transforms.Lambda(lambda x: np.pad(x, [(4, 4), (4, 4), (0, 0)], mode='reflect')),
transforms.Lambda(lambda x: Image.fromarray(x)),
transforms.RandomCrop(32),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(cifar10_stats['mean'], cifar10_stats['std']),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(cifar10_stats['mean'], cifar10_stats['std']),
])
train_loader = torch.utils.data.DataLoader(
datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train), # change back to
batch_size=2048, shuffle=True, pin_memory=True)
test_loader = torch.utils.data.DataLoader(
datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test),
batch_size=2048, shuffle=True, pin_memory=True)
device
model = CNN(64, 1).to(device)
model
for batch_i, (X, _) in enumerate(train_loader):
X = X.to(device)
model.forward(X)
def set_bn_eval(m):
classname = m.__class__.__name__
print(classname)
if classname.find('BatchNorm') != -1:
m.eval()
model.apply(set_bn_eval)
model
_, (X, y) = next(enumerate(train_loader))
X = X.to(device)
y = to_one_hot(y).to(device)
print(y.size())
print(X.size())
estimator = Estimator(model, device, learning_rate=.05, step=2048)
estimator.fit(X, y)
((estimator.predict(X) - y) ** 2).mean()
get_accuracy(estimator, test_loader)
def gen_estimator():
return Estimator(model, device, learning_rate=.03, step=2048)
boosted_estimator = boost_estimator(None, gen_estimator, train_loader, batch_limit=30, learning_rate=1.)
boosted_estimator
get_accuracy(boosted_estimator, test_loader)
boost_estimator(boosted_estimator, gen_estimator, train_loader, batch_limit=30, learning_rate=.2)
boosted_estimator
get_accuracy(boosted_estimator, test_loader)
boost_estimator(boosted_estimator, gen_estimator, train_loader, batch_limit=10, learning_rate=.1)
get_accuracy(boosted_estimator, test_loader)
import copy
boosted_estimator_old = copy.deepcopy(boosted_estimator)